package com.guo.mixframe.framework.receivers.rpc

import com.guo.mixframe.framework.configs.CoreConfig
import com.guo.mixframe.framework.exception.CoreException
import com.guo.mixframe.framework.log.LogPrint
import com.guo.mixframe.framework.protocol.RpcReqHeader
import com.guo.mixframe.framework.protocol.RpcRespHeader
import com.guo.mixframe.framework.urls.URLMapping
import com.guo.mixframe.framework.utils.IPAddrHelper
import com.guo.mixframe.framework.utils.ProtobufUtils
import com.guo.mixframe.framework.utils.StringHelper
import io.netty.buffer.ByteBuf
import io.netty.buffer.PooledByteBufAllocator
import io.vertx.core.AbstractVerticle
import io.vertx.core.AsyncResult
import io.vertx.core.Handler
import io.vertx.core.Vertx
import io.vertx.core.buffer.Buffer
import io.vertx.core.eventbus.DeliveryOptions
import io.vertx.core.eventbus.Message
import io.vertx.core.eventbus.ReplyException
import io.vertx.core.http.HttpMethod
import io.vertx.core.http.HttpServerOptions
import io.vertx.core.http.HttpServerRequest
import io.vertx.core.http.HttpServerResponse
import io.vertx.ext.web.Router
import io.vertx.ext.web.RoutingContext
import io.vertx.ext.web.handler.BodyHandler
import java.util.*
import java.util.ArrayDeque
import java.util.concurrent.ConcurrentHashMap

/**
 * @author  gx
 * @description
 */
class RpcServer(private val port: Int,private val vtx:Vertx){

  private data class RpcRequest(
    val url: String,
    val rpcType: String?,
    val options: DeliveryOptions?,
    val body: ByteArray,
    val request: HttpServerRequest
  )

  var idleTimeout: Int = 1200
  var sendTimeout: Long = 3000

  private val rpcRequests = ConcurrentHashMap<String, Queue<RpcRequest>>()

  fun start() {
    val serverOptions = HttpServerOptions().setIdleTimeout(idleTimeout)
    val server = vtx.createHttpServer(serverOptions)

    val router = Router.router(vtx)
    //url格式为http://xxxx/
    val route = router.route(HttpMethod.POST, "/").handler(BodyHandler.create())

    route.handler { routingContext: RoutingContext ->
      try {
        //消息体
        val byteBuf = routingContext.body.byteBuf
        //从byteBuf中读消息头和消息体，创建RpcRequest
        val request = readRequest(byteBuf, routingContext.request())
        //检查url和消息来源的合法性
        checkValid(request)
        //处理消息
        handleReceive(request)
      } catch (e: Exception) {
        sendToClient(routingContext.response(), RpcRespHeader(-1), null)
      }
    }
    server.requestHandler(router).listen(port)

    LogPrint.logger.debug("RPC Server start on port $port")
  }

  /**
   * 从byteBuf中读消息头和消息体，创建RpcRequest
   */
  private fun readRequest(byteBuf: ByteBuf, request: HttpServerRequest): RpcRequest {
    //读消息头
    val header = readHeader(byteBuf)
    //将header设置到options总
    val options = buildDeliveryOptions(header, request)
    //读消息体
    val body = readBody(byteBuf)
    //创建RpcRequest
    return RpcRequest(header.url!!, header.rpcType, options, body, request)
  }

  /**
   * 检查url和消息来源的合法性
   * url类型必须为该服务器类型
   * 消息来源必须为局域网
   */
  private fun checkValid(request: RpcRequest) {
    if (!CoreConfig.getInstance().isUrlTypeValid(URLMapping.getUrlTypeByUrl(request.url))) {
      throw CoreException().throwException(CoreException.InvalidParam)
    }
    val remoteIp = request.request.remoteAddress().host()
    val ipValid = IPAddrHelper.isInnerIP(remoteIp)
    if (!ipValid) {
      throw CoreException().throwException(CoreException.PermissionDenied)
    }
  }

  /**
   * 读消息头
   */
  private fun readHeader(buffer: ByteBuf): RpcReqHeader {
    //前4个字节表示消息头的长度
    val length = buffer.readIntLE()
    val encodedData = ByteArray(length)
    buffer.readBytes(encodedData)
    val headerClass = RpcReqHeader::class.java
    val header = ProtobufUtils.byteToBean(encodedData, headerClass)
    //url转换为小写
    header.url = header.url!!.lowercase()
    return header
  }

  /**
   * 读消息体
   */
  private fun readBody(buffer: ByteBuf): ByteArray {
    val body = ByteArray(buffer.readableBytes())
    buffer.readBytes(body)
    return body
  }

  /**
   * 设置options
   */
  private fun buildDeliveryOptions(reqHeader: RpcReqHeader, request: HttpServerRequest): DeliveryOptions? {
    val options = DeliveryOptions()
    options.sendTimeout = sendTimeout
    options.addHeader("url", reqHeader.url)
    options.addHeader("ip", request.remoteAddress().host())
    return options
  }

  /**
   * 处理rpc消息，如果rpcType为空，直接处理，否者加入对应的rpc队列，如果加入队列返回true就直接处理下一个消息
   */
  private fun handleReceive(request: RpcRequest) {
    if (StringHelper.isNullOrEmpty(request.rpcType)) {
      processOneRpcRequest(request, null)
    } else if (addRequest(request)) {
      processNextRpcRequest(request.rpcType!!)
    }
  }

  /**
   * 把rpc消息加入队列，如果队列为空，加入后返回true，否者返回false
   */
  private fun addRequest(request: RpcRequest): Boolean {
    synchronized(request.rpcType!!.intern()) {
      var queue: Queue<RpcRequest>? = rpcRequests[request.rpcType]
      if (queue == null) {
        queue = ArrayDeque()
        rpcRequests[request.rpcType] = queue
      }
      if (queue.size == 0) {
        queue.add(request)
        return true
      }
      queue.add(request)
      return false
    }
  }

  /**
   * 取rpcType队列的头部消息
   */
  private fun peekGameMessage(rpcType: String): RpcRequest? {
    synchronized(rpcType.intern()) {
      val queue = rpcRequests[rpcType]
      if (queue == null || queue.size <= 0) {
        return null
      }
      return queue.peek()
    }
  }

  /**
   * 移除rpcType队列的头部消息
   */
  private fun pollGameMessage(rpcType: String) {
    synchronized(rpcType.intern()) {
      val queue = rpcRequests[rpcType]
      if (queue == null || queue.size <= 0) {
        return
      }
      queue.poll()
    }
  }

  /**
   * 处理下一个消息
   */
  private fun processNextRpcRequest(rpcType: String) {
    //取消息
    val message = peekGameMessage(rpcType)
    if (message != null) {
      //处理消息
      processOneRpcRequest(message) {
        //移除消息
        pollGameMessage(rpcType)
        //处理下一个消息
        processNextRpcRequest(rpcType)
      }
    }
  }

  /**
   * 处理消息，将消息通过eventbus转发处理
   * 处理成功后返回给调用者
   */
  private fun processOneRpcRequest(request: RpcRequest, callback: Handler<Void>?) {
    val options: DeliveryOptions = request.options!!
    val body: ByteArray = request.body
    val response: HttpServerResponse = request.request.response()
    vtx.eventBus().request(request.url, body, options) { resp: AsyncResult<Message<ByteArray?>> ->
      if (resp.succeeded()) {
        sendToClient(response, RpcRespHeader(true), resp.result().body())
      } else {
        var failureCode = (resp.cause() as ReplyException).failureCode()
        if (failureCode < 0) {
          failureCode = CoreException.TimeOut
        }
        sendToClient(response, RpcRespHeader(failureCode.toLong()), null)
      }
      callback?.handle(null)
    }
  }

  /**
   * 将消息结果返回给调用者
   * 前4个字节为消息返回头长度，接着为消息头，消息体
   */
  private fun sendToClient(response: HttpServerResponse, header: RpcRespHeader, body: ByteArray?) {
    val encodedData: ByteArray = ProtobufUtils.bean2Byte(header)

    var dataLength = encodedData.size + 4 /* Header length size */
    val protocolValid = body != null && body.isNotEmpty()

    if (protocolValid) {
      dataLength += body!!.size
    }

    val buf = PooledByteBufAllocator.DEFAULT.buffer(dataLength)
    buf.writeIntLE(encodedData.size)
    buf.writeBytes(encodedData)

    if (protocolValid) {
      buf.writeBytes(body)
    }

    response.end(Buffer.buffer(buf)) {
      //处理完成后，buf必须释放，否则有内存泄漏
      buf.release()
    }
  }


}
